package com.xiaofan.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object KafkaPipelineTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")

    // 从外部系统读取数据
    val sensorTable: Table = tableEnv.from("kafkaInputTable")

    // 简单转换
    val resultTable: Table = sensorTable
      .select($"id", $"temp")
      .filter($"id" === "sensor_1")

    // 聚合转换
    val aggTable: Table = sensorTable
      .groupBy($"id")
      .select($"id", $"id".count as "count")


    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sink_test")
      .property("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaOutputTable")

    // 把数据写入外部系统
    resultTable.executeInsert("kafkaOutputTable")

  }
}
